/*
 * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
 * one or more contributor license agreements. See the NOTICE file distributed
 * with this work for additional information regarding copyright ownership.
 * Licensed under the Camunda License 1.0. You may not use this file
 * except in compliance with the Camunda License 1.0.
 */
package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorCode;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorResponse;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.StreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.UUIDEncoder;
import io.camunda.zeebe.util.CloseableSilently;
import java.util.UUID;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Manages mutating the stream registry via specific requests.
 *
 * @param <M> the metadata type of the registered streams
 */
public final class RemoteStreamApiHandler<M> implements CloseableSilently {
  private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamApiHandler.class);

  // differs from the UUID RFC, which describes the nil-UUID as (0L, 0L), but there is no real way
  // to configure SBE null values. at the same time, the chances of generating the same ID as this
  // are infinitesimally low. this also produces a UUID with version 0, which is not generated by
  // the standard library
  private static final UUID NULL_ID =
      new UUID(UUIDEncoder.highNullValue(), UUIDEncoder.lowNullValue());

  private final AddStreamResponse addResponseOK = new AddStreamResponse();
  private final ErrorResponse errorResponse = new ErrorResponse();
  private final RemoveStreamResponse removeResponseOK = new RemoveStreamResponse();

  private final RemoteStreamRegistry<M> registry;
  private final Function<DirectBuffer, M> metadataFactory;

  public RemoteStreamApiHandler(
      final RemoteStreamRegistry<M> registry, final Function<DirectBuffer, M> metadataFactory) {
    this.registry = registry;
    this.metadataFactory = metadataFactory;
  }

  @Override
  public void close() {
    registry.clear();
  }

  public StreamResponse add(final MemberId sender, final AddStreamRequest request) {
    final M properties;

    try {
      properties = metadataFactory.apply(request.metadata());
    } catch (final Exception e) {
      final var errorMessage =
          "Failed to parse stream metadata (size = '%d') from AddStreamRequest"
              .formatted(request.metadata().capacity());
      return failedResponse(sender, errorMessage, e);
    }

    if (request.streamType().capacity() <= 0) {
      final String errorMessage =
          "Expected a stream type of length > 0, but it has %d"
              .formatted(request.streamType().capacity());
      return failedResponse(sender, errorMessage);
    }

    if (request.streamId() == null || request.streamId().equals(NULL_ID)) {
      final String errorMessage =
          "Expected a stream ID, but received a nil UUID ([%s])".formatted(request.streamId());
      return failedResponse(sender, errorMessage);
    }

    registry.add(new UnsafeBuffer(request.streamType()), request.streamId(), sender, properties);
    LOG.debug("Opened stream {} from {}", request.streamId(), sender);
    return addResponseOK;
  }

  public StreamResponse remove(final MemberId sender, final RemoveStreamRequest request) {
    if (request.streamId() == null || request.streamId().equals(NULL_ID)) {
      final String errorMessage =
          "Expected a stream ID, but received a nil UUID ([%s])".formatted(request.streamId());
      return failedResponse(sender, errorMessage);
    }

    registry.remove(request.streamId(), sender);
    LOG.debug("Removed stream {} from {}", request.streamId(), sender);
    return removeResponseOK;
  }

  public void removeAll(final MemberId sender) {
    registry.removeAll(sender);
    LOG.debug("Removed all streams from {}", sender);
  }

  private ErrorResponse failedResponse(
      final MemberId sender, final String errorMessage, final Exception cause) {
    LOG.warn("Failed to open stream for '{}': [{}]", sender, errorMessage, cause);
    return errorResponse.code(ErrorCode.MALFORMED).message(errorMessage);
  }

  private ErrorResponse failedResponse(final MemberId sender, final String errorMessage) {
    LOG.warn("Failed to open stream for '{}': [{}]", sender, errorMessage);
    return errorResponse.code(ErrorCode.INVALID).message(errorMessage);
  }
}
